Refactor the parallel job queue to its own module
authorAlex Crichton <alex@alexcrichton.com>
Fri, 11 Jul 2014 15:57:47 +0000 (08:57 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Fri, 11 Jul 2014 15:57:47 +0000 (08:57 -0700)
src/cargo/ops/cargo_rustc/job_queue.rs [new file with mode: 0644]
src/cargo/ops/cargo_rustc/mod.rs

diff --git a/src/cargo/ops/cargo_rustc/job_queue.rs b/src/cargo/ops/cargo_rustc/job_queue.rs
new file mode 100644 (file)
index 0000000..99a21c1
--- /dev/null
@@ -0,0 +1,109 @@
+use std::collections::HashMap;
+use term::color::YELLOW;
+
+use core::Package;
+use util::{Config, TaskPool, DependencyQueue, Fresh, Dirty, Freshness};
+use util::CargoResult;
+
+use super::job::Job;
+
+pub struct JobQueue<'a, 'b> {
+    pool: TaskPool,
+    queue: DependencyQueue<(&'a Package, Job)>,
+    tx: Sender<Message>,
+    rx: Receiver<Message>,
+    active: HashMap<String, uint>,
+    config: &'b mut Config<'b>,
+}
+
+type Message = (String, Freshness, CargoResult<Vec<Job>>);
+
+impl<'a, 'b> JobQueue<'a, 'b> {
+    pub fn new(config: &'b mut Config<'b>,
+               jobs: Vec<(&'a Package, Freshness, Job)>) -> JobQueue<'a, 'b> {
+        let (tx, rx) = channel();
+        let mut queue = DependencyQueue::new();
+        for &(pkg, _, _) in jobs.iter() {
+            queue.register(pkg);
+        }
+        for (pkg, fresh, job) in jobs.move_iter() {
+            queue.enqueue(pkg, fresh, (pkg, job));
+        }
+
+        JobQueue {
+            pool: TaskPool::new(config.jobs()),
+            queue: queue,
+            tx: tx,
+            rx: rx,
+            active: HashMap::new(),
+            config: config,
+        }
+    }
+
+    /// Execute all jobs necessary to build the dependency graph.
+    ///
+    /// This function will spawn off `config.jobs()` workers to build all of the
+    /// necessary dependencies, in order. Freshness is propagated as far as
+    /// possible along each dependency chain.
+    pub fn execute(&mut self) -> CargoResult<()> {
+        // Iteratively execute the dependency graph. Each turn of this loop will
+        // schedule as much work as possible and then wait for one job to finish,
+        // possibly scheduling more work afterwards.
+        while self.queue.len() > 0 {
+            loop {
+                match self.queue.dequeue() {
+                    Some((name, Fresh, (pkg, _))) => {
+                        assert!(self.active.insert(name.clone(), 1u));
+                        try!(self.config.shell().status("Fresh", pkg));
+                        self.tx.send((name, Fresh, Ok(Vec::new())));
+                    }
+                    Some((name, Dirty, (pkg, job))) => {
+                        assert!(self.active.insert(name.clone(), 1));
+                        try!(self.config.shell().status("Compiling", pkg));
+                        let my_tx = self.tx.clone();
+                        self.pool.execute(proc() my_tx.send((name, Dirty, job.run())));
+                    }
+                    None => break,
+                }
+            }
+
+            // Now that all possible work has been scheduled, wait for a piece
+            // of work to finish. If any package fails to build then we stop
+            // scheduling work as quickly as possibly.
+            let (name, fresh, result) = self.rx.recv();
+            *self.active.get_mut(&name) -= 1;
+            match result {
+                Ok(v) => {
+                    for job in v.move_iter() {
+                        *self.active.get_mut(&name) += 1;
+                        let my_tx = self.tx.clone();
+                        let my_name = name.clone();
+                        self.pool.execute(proc() {
+                            my_tx.send((my_name, fresh, job.run()));
+                        });
+                    }
+                    if *self.active.get(&name) == 0 {
+                        self.active.remove(&name);
+                        self.queue.finish(&name, fresh);
+                    }
+                }
+                Err(e) => {
+                    if *self.active.get(&name) == 0 {
+                        self.active.remove(&name);
+                    }
+                    if self.active.len() > 0 && self.config.jobs() > 1 {
+                        try!(self.config.shell().say(
+                                    "Build failed, waiting for other \
+                                     jobs to finish...", YELLOW));
+                        for _ in self.rx.iter() {}
+                    }
+                    return Err(e)
+                }
+            }
+        }
+
+        log!(5, "rustc jobs completed");
+
+        Ok(())
+    }
+}
index 9d28b09eda0965f1e2ddfc67ac8b2c43c67df4ca..067acd0dd590a886b0edbe12f1e50a7cba909b93 100644 (file)
@@ -1,17 +1,16 @@
-use std::collections::HashMap;
-use term::color::YELLOW;
-
 use core::{Package, PackageSet, Target, Resolve};
 use util;
 use util::{CargoResult, ProcessBuilder, CargoError, human};
-use util::{Config, TaskPool, DependencyQueue, Fresh, Dirty, Freshness};
+use util::{Config, Freshness};
 
 use self::job::Job;
+use self::job_queue::JobQueue;
 use self::context::Context;
 
-mod job;
 mod context;
 mod fingerprint;
+mod job;
+mod job_queue;
 
 type Args = Vec<String>;
 
@@ -77,7 +76,7 @@ pub fn compile_targets<'a>(env: &str, targets: &[&Target], pkg: &Package,
     try!(compile(targets, pkg, &mut cx, &mut jobs));
 
     // Now that we've figured out everything that we're going to do, do it!
-    execute(cx.config, jobs)
+    JobQueue::new(cx.config, jobs).execute()
 }
 
 fn compile<'a>(targets: &[&Target], pkg: &'a Package, cx: &mut Context,
@@ -276,81 +275,3 @@ fn build_deps_args(dst: &mut Args, package: &Package, cx: &Context) {
                  cx.target_filename(target)));
     }
 }
-
-/// Execute all jobs necessary to build the dependency graph.
-///
-/// This function will spawn off `config.jobs()` workers to build all of the
-/// necessary dependencies, in order. Freshness is propagated as far as possible
-/// along each dependency chain.
-fn execute(config: &mut Config,
-           jobs: Vec<(&Package, Freshness, Job)>) -> CargoResult<()> {
-    let pool = TaskPool::new(config.jobs());
-    let (tx, rx) = channel();
-    let mut queue = DependencyQueue::new();
-    for &(pkg, _, _) in jobs.iter() {
-        queue.register(pkg);
-    }
-    for (pkg, fresh, job) in jobs.move_iter() {
-        queue.enqueue(pkg, fresh, (pkg, job));
-    }
-
-    // Iteratively execute the dependency graph. Each turn of this loop will
-    // schedule as much work as possible and then wait for one job to finish,
-    // possibly scheduling more work afterwards.
-    let mut active = HashMap::new();
-    while queue.len() > 0 {
-        loop {
-            match queue.dequeue() {
-                Some((name, Fresh, (pkg, _))) => {
-                    assert!(active.insert(name.clone(), 1u));
-                    try!(config.shell().status("Fresh", pkg));
-                    tx.send((name, Fresh, Ok(Vec::new())));
-                }
-                Some((name, Dirty, (pkg, job))) => {
-                    assert!(active.insert(name.clone(), 1));
-                    try!(config.shell().status("Compiling", pkg));
-                    let my_tx = tx.clone();
-                    pool.execute(proc() my_tx.send((name, Dirty, job.run())));
-                }
-                None => break,
-            }
-        }
-
-        // Now that all possible work has been scheduled, wait for a piece of
-        // work to finish. If any package fails to build then we stop scheduling
-        // work as quickly as possibly.
-        let (name, fresh, result) = rx.recv();
-        *active.get_mut(&name) -= 1;
-        match result {
-            Ok(v) => {
-                for job in v.move_iter() {
-                    *active.get_mut(&name) += 1;
-                    let my_tx = tx.clone();
-                    let my_name = name.clone();
-                    pool.execute(proc() {
-                        my_tx.send((my_name, fresh, job.run()));
-                    });
-                }
-                if *active.get(&name) == 0 {
-                    active.remove(&name);
-                    queue.finish(&name, fresh);
-                }
-            }
-            Err(e) => {
-                if *active.get(&name) == 0 {
-                    active.remove(&name);
-                }
-                if active.len() > 0 && config.jobs() > 1 {
-                    try!(config.shell().say("Build failed, waiting for other \
-                                             jobs to finish...", YELLOW));
-                    for _ in rx.iter() {}
-                }
-                return Err(e)
-            }
-        }
-    }
-
-    log!(5, "rustc jobs completed");
-
-    Ok(())
-}